聊天室后端接口设计
# websocket 服务设计
- 生成一个 websocket 实例,并且设置于 http-serve 为同一个端口监听
// 主文件
const app = express();
// 创建 HTTP 服务器
const server = createServer(app);
// 使用 chatroomRoute
app.use('/mysql', chatroomRoute);
// 传递 Express 应用实例和 HTTP 服务器实例给 WebSocket 逻辑
setupWebSocket(server);
server.listen(7005, () => {
console.log('\x1b[35m%s\x1b[0m', 'mysql server working...');
});
// chatroomRoute 文件
function setupWebSocket(server) {
const rooms = new Map();
const userClientsMap = new Map();
// 创建 WebSocket 服务器,使用传入的 HTTP 服务器
const wss = new WebSocketServer({ server });
// 监听客户端连接事件
wss.on('connection', async (ws, req) => {
// 根据 聊天室id 和 用户id 生成两个 Map集合,分别对于不同的广播对象,一个是广播给聊天室的所有人,一个是单独广播给当前登录的人
// 更新 用户 的状态 为 online
// 进入房间的信息也入表吧
// 广播消息,对象:roomid下的所有用户的websocket,内容: xxx 进入房间了、聊天室在线的人变更
// 监听客户端发送的消息
ws.on('message', async (message) => {
// 根据前端发来的消息的种类 进行 不同的操作
// 如果发过来的是消息,将发送来的消息存入数据库,然后直接广播给同房间下的所有人即可
// 如果是请求最近的200条聊天记录,将该房间下最近200条聊天记录,广播给当前登录的人
});
// 监听客户端断开连接
ws.on('close', async () => {
// 离开房间的信息也入表吧
// 广播消息,对象:roomid下的所有用户的websocket,内容: xxx 离开房间了、聊天室在线的人变更
// 告诉大家 xxx 离开了房间
});
});
console.log('\x1b[35m%s\x1b[0m', 'WebSocket server working...');
}
- 再实现一个用户登录的接口,这个接口会创建或者更新用户信息、聊天室信息
// 创建 或 进入聊天室
router.post('/createChatroom', async (req, res) => {
const { chatroomName, nickName, headNum } = req.body;
try {
const now = dayjs().valueOf();
const nowTime = dayjs().format('YYYY-MM-DD HH:mm:ss');
const room_id = `room_${now}`;
const user_id = `user_${now}`;
const roomTable = `${DB_NAME}.chat_room_table`;
const userTable = `${DB_NAME}.chat_room_user_table`;
// 查看是否存在当前的房间
const queryRoomSql = `SELECT room_id FROM ${roomTable} where room_name='${chatroomName}'`;
const existRoomId = (await runSql(queryRoomSql))[0]?.room_id;
// 创建房间,如果房间有现成的,那就直接用现成的,如果没有就新建一个房间
await insertOrUpdateAfterQuery(
`select 1 from ${roomTable} where room_name='${chatroomName}'`,
`update ${roomTable} set room_status='active' where room_name='${chatroomName}'`,
`insert into ${roomTable} (room_id, room_name, room_status, create_time) values ('${room_id}', '${chatroomName}', 'active', '${nowTime}')`,
);
// 查询当前用户活跃的房间
const queryRoomIdsSql = `SELECT user_id, room_ids FROM ${userTable} where user_name='${nickName}'`;
const user = (await runSql(queryRoomIdsSql))[0];
const userId = user?.user_id;
const roomIds = user?.room_ids?.split(',').filter(Boolean) || [];
const newRoomIds = _.uniq([...roomIds, existRoomId]).join(',');
// 创建用户
await insertOrUpdateAfterQuery(
`select 1 from ${userTable} where user_name='${nickName}'`,
`update ${userTable} set user_status='online',room_ids='${newRoomIds}',head_num='${headNum}' where user_name='${nickName}'`,
`insert into ${userTable} (user_id, user_name, user_status, create_time, room_ids, head_num) values ('${user_id}', '${nickName}', 'online', '${nowTime}', '${newRoomIds}', '${headNum}')`,
);
res.send({
success: true,
data: {
chatroomId: existRoomId || room_id,
nickId: userId || user_id,
},
});
} catch (e) {
console.log(e);
res.send({
error: '创建失败',
});
}
});
# 完整实现代码
点击 展开/收起 完整 代码
/**
* @des 用户列表 相关接口
*/
import express from 'express';
import { WebSocketServer, WebSocket } from 'ws';
import dayjs from 'dayjs';
import _ from 'lodash';
import {
DB_NAME,
isProd,
delay,
runSql,
insertOrUpdateAfterQuery,
sshConfig,
} from './constsES5.mjs';
const router = express.Router();
function parseUrlParams(url) {
const search = _.split(url, '?')[1] || '';
const params = new URLSearchParams(search);
// 将参数转换为对象
const paramsObj = {};
for (const [key, value] of params.entries()) {
paramsObj[key] = value;
}
return paramsObj;
}
// 广播消息
function broadcastToRoom(clients, message) {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
const getOnlineMembersByRoomId = async (roomId) => {
const userTable = `${DB_NAME}.chat_room_user_table`;
// 查询在线的人 且 在指定聊天室的人
const queryOnlineMembers = `SELECT user_name, head_num FROM ${userTable} where room_ids like '%${roomId}%' and user_status='online'`;
const onlineMembers = (await runSql(queryOnlineMembers)) || [];
return onlineMembers;
};
const getLogsByRoomId = async (roomId) => {
const logTable = `${DB_NAME}.chat_room_log_table`;
// 查询聊天记录 最近 200 条
const queryLogs = `SELECT log_user_name, log_content, log_time FROM ${logTable} where log_room_id='${roomId}' order by log_time limit 200`;
const logs = await runSql(queryLogs);
return logs;
};
function setupWebSocket(server) {
const rooms = new Map();
const userClientsMap = new Map();
// 创建 WebSocket 服务器,使用传入的 HTTP 服务器
const wss = new WebSocketServer({ server });
// 监听客户端连接事件
wss.on('connection', async (ws, req) => {
const logTable = `${DB_NAME}.chat_room_log_table`;
const userTable = `${DB_NAME}.chat_room_user_table`;
const nowTime = dayjs().format('YYYY-MM-DD HH:mm:ss');
const query = parseUrlParams(req.url);
// 判断 roomId 是否存在
const { roomId, nickId } = query;
if (!roomId) {
ws.close(1008, 'Room ID not provided');
return;
}
// 将连接加入到对应的房间
if (!rooms.has(roomId)) {
rooms.set(roomId, new Set());
}
rooms.get(roomId).add(ws);
// 将连接加入到对应的个人
if (!userClientsMap.has(nickId)) {
userClientsMap.set(nickId, new Set());
}
userClientsMap.get(nickId).add(ws);
const roomIdClients = rooms.get(roomId);
const getUserClients = (nickId) => userClientsMap.get(nickId);
// 更新 用户 的状态 为 online
const queryRoomIdsSql = `SELECT user_id, user_name, room_ids FROM ${userTable} where user_id='${nickId}'`;
const userInfo = (await runSql(queryRoomIdsSql))[0];
const user_name = userInfo?.user_name;
const roomIds = userInfo?.room_ids?.split(',').filter(Boolean) || [];
const newRoomIds = _.uniq([...roomIds, roomId]).join(',');
const joinInMessage = `${user_name} 进入了房间`;
await runSql(
`update ${userTable} set user_status='online', room_ids='${newRoomIds}' where user_id='${nickId}' `,
);
// 进入房间的信息也入库吧
await runSql(
`insert into ${logTable}
(log_id, log_content, log_time, log_room_id, log_user_id, log_user_name)
values
('${roomId}_${nickId}_${nowTime}', '${joinInMessage}', '${nowTime}', '${roomId}', '${nickId}', 'system')
`,
);
const onlineMembers = await getOnlineMembersByRoomId(roomId);
broadcastToRoom(
roomIdClients,
JSON.stringify([
// 告诉大家 xxx 进入了房间
{
type: 'message',
data: {
log_user_name: 'system',
log_content: joinInMessage,
},
},
// 通知大家,聊天室的信息变更了,这里只返回在线的人
{
type: 'members',
data: onlineMembers,
},
]),
);
// 监听客户端发送的消息
ws.on('message', async (message) => {
// 根据消息种类 进行 不同的操作
const messageParse = JSON.parse(message.toString('utf-8'));
const { content, sendUserName, sendUserId, sendRoom, type, sendTime } =
messageParse;
// 如果发过来的是消息,那么直接广播出去即可
if (type === 'message') {
// 将发送来的消息存入数据库
await runSql(
`insert into ${logTable}
(log_id, log_content, log_time, log_room_id, log_user_id, log_user_name)
values
('${sendRoom}_${sendUserId}_${sendTime}', '${content}', '${sendTime}', '${sendRoom}', '${sendUserId}', '${sendUserName}')
`,
);
// 广播消息给同一房间的其他客户端
broadcastToRoom(
roomIdClients,
JSON.stringify([
{
type: 'message',
data: {
log_user_name: sendUserName,
log_content: content,
log_time: sendTime,
},
},
]),
);
}
// 如果是请求最近的200条聊天记录
if (type === 'query-logs') {
// 广播消息给指定的这个用户
const logs = await getLogsByRoomId(sendRoom);
broadcastToRoom(
getUserClients(sendUserId),
JSON.stringify([
{
type: 'logs',
data: logs,
},
]),
);
}
});
// 监听客户端断开连接
ws.on('close', async () => {
const exitTime = dayjs().format('YYYY-MM-DD HH:mm:ss');
const exitMessage = `${user_name} 离开了房间`;
await runSql(
`update ${userTable} set user_status='offline', room_ids='' where user_id='${nickId}' `,
);
// 离开房间的信息也入库吧
await runSql(
`insert into ${logTable}
(log_id, log_content, log_time, log_room_id, log_user_id, log_user_name)
values
('${roomId}_${nickId}_${exitTime}', '${exitMessage}', '${exitTime}', '${roomId}', '${nickId}', 'system')
`,
);
const onlineMembers = await getOnlineMembersByRoomId(roomId);
// 告诉大家 xxx 离开了房间
broadcastToRoom(
roomIdClients,
JSON.stringify([
// 告诉大家 xxx 离开了房间
{
type: 'message',
data: {
log_user_name: 'system',
log_content: exitMessage,
},
},
// 通知大家,聊天室的信息变更了,这里只返回在线的人
{
type: 'members',
data: onlineMembers,
},
]),
);
});
});
console.log('\x1b[35m%s\x1b[0m', 'WebSocket server working...');
}
// 获取 room 的数量
router.get('/getRoomNums', async (req, res) => {
try {
const querySql = `SELECT room_id FROM ${DB_NAME}.chat_room_table`;
const results = await runSql(querySql);
res.send({
success: true,
data: results.length,
});
} catch (e) {
res.send({
success: true,
data: 0,
});
}
});
// 创建 或 进入聊天室
router.post('/createChatroom', async (req, res) => {
const { chatroomName, nickName, headNum } = req.body;
try {
const now = dayjs().valueOf();
const nowTime = dayjs().format('YYYY-MM-DD HH:mm:ss');
const room_id = `room_${now}`;
const user_id = `user_${now}`;
const roomTable = `${DB_NAME}.chat_room_table`;
const userTable = `${DB_NAME}.chat_room_user_table`;
// 查看是否存在当前的房间
const queryRoomSql = `SELECT room_id FROM ${roomTable} where room_name='${chatroomName}'`;
const existRoomId = (await runSql(queryRoomSql))[0]?.room_id;
// 创建房间,如果房间有现成的,那就直接用现成的,如果没有就新建一个房间
await insertOrUpdateAfterQuery(
`select 1 from ${roomTable} where room_name='${chatroomName}'`,
`update ${roomTable} set room_status='active' where room_name='${chatroomName}'`,
`insert into ${roomTable} (room_id, room_name, room_status, create_time) values ('${room_id}', '${chatroomName}', 'active', '${nowTime}')`,
);
// 查询当前用户活跃的房间
const queryRoomIdsSql = `SELECT user_id, room_ids FROM ${userTable} where user_name='${nickName}'`;
const user = (await runSql(queryRoomIdsSql))[0];
const userId = user?.user_id;
const roomIds = user?.room_ids?.split(',').filter(Boolean) || [];
const newRoomIds = _.uniq([...roomIds, existRoomId]).join(',');
// 创建用户
await insertOrUpdateAfterQuery(
`select 1 from ${userTable} where user_name='${nickName}'`,
`update ${userTable} set user_status='online',room_ids='${newRoomIds}',head_num='${headNum}' where user_name='${nickName}'`,
`insert into ${userTable} (user_id, user_name, user_status, create_time, room_ids, head_num) values ('${user_id}', '${nickName}', 'online', '${nowTime}', '${newRoomIds}', '${headNum}')`,
);
res.send({
success: true,
data: {
chatroomId: existRoomId || room_id,
nickId: userId || user_id,
},
});
} catch (e) {
console.log(e);
res.send({
error: '创建失败',
});
}
});
// 判断用户是否正规进入过room
router.post('/validateNickIdByRoomId', async (req, res) => {
const { roomId, nickId } = req.body;
try {
const userTable = `${DB_NAME}.chat_room_user_table`;
// 查看是否存在当前的房间
const queryStatusSql = `SELECT user_status FROM ${userTable} where user_id='${nickId}' and room_ids like '%${roomId}%' `;
const status = (await runSql(queryStatusSql))[0]?.user_status;
if (status === 'online') {
res.send({
success: true,
});
} else {
res.send({
success: false,
});
}
} catch (e) {
res.send({
success: false,
});
}
});
// 根据roomid 查询 room信息
router.post('/getRoomInfo', async (req, res) => {
const { roomId } = req.body;
try {
const roomTable = `${DB_NAME}.chat_room_table`;
// 查看是否存在当前的房间
const querySql = `SELECT room_name FROM ${roomTable} where room_id = '${roomId}' `;
const roomName = (await runSql(querySql))[0]?.room_name;
res.send({
success: true,
data: roomName,
});
} catch (e) {
res.send({
error: '查询报错',
});
}
});
// 根据 userId 查询 用户信息
router.post('/getUserInfo', async (req, res) => {
const { userId } = req.body;
try {
const userTable = `${DB_NAME}.chat_room_user_table`;
// 查看是否存在当前的房间
const querySql = `SELECT user_name FROM ${userTable} where user_id='${userId}' `;
const userName = (await runSql(querySql))[0]?.user_name;
res.send({
success: true,
data: userName,
});
} catch (e) {
res.send({
error: '查询报错',
});
}
});
export { setupWebSocket, router };